package com.atguigu.day06;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;

// sink to kafka
public class Example5 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment
                .getExecutionEnvironment();
        env.setParallelism(1);

        Properties properties = new Properties();
        // 集群地址
        properties.put("bootstrap.servers", "localhost:9092");

        env
                .readTextFile("/home/zuoyuan/flink0609/src/main/resources/UserBehavior.csv")
                .addSink(
                        new FlinkKafkaProducer<String>(
                                "userbehavior-0609", // topic
                                new SimpleStringSchema(),   // 表示写入的数据类型是String
                                properties
                        )
                );

        env.execute();
    }
}
